data-emitter-base

Contains a set of base classes and interfaces to minimize
boilerplate code when integrating data sources.
IDataEmitter
This is the primary interface that all emitters implement.
Usage
Subscribe to events
import {IDataEmitter} from '@curium.rocks/data-emitter-base';
const emitter: IDataEmitter = new ADataEmitterImplementation();
const onDataListener = emitter.onData({
onData: (evt) => {
if(evt.data instanceof string){
console.log(`data: ${evt.data}`)
}
}
})
const onStatusListener = emitter.onStatus({
onStatus: (evt) => {
if(evt.bit) {
console.log("emitter is in a failed state");
} else {
console.log("emitter is in a healthy state");
}
if(evt.connected) {
console.log("emitter is connected to it's data source");
} else {
console.log("emitter is disconnected from it's data source");
}
}
})
onDataListener.dispose();
onStatusListener.dispose();
On demand fetch
import {IDataEmitter} from '@curium.rocks/data-emitter-base';
import {IDataEvent, IStatusEvent} from "./dataEmitter";
const emitter: IDataEmitter = new ADataEmitterImplementation();
const latestData: IDataEvent = await emitter.probeCurrentData();
const latestStatus: IStatusEvent = await emitter.probeStatus();
if (latestData.data instanceof string) {
console.log(`data: ${latestData.data}`)
}
if(latestStatus.bit) {
console.log("emitter in failed state");
} else {
console.log("emitter in healthy state");
}
if(latestStatus.connected) {
console.log("emitter connected");
} else {
console.log("emitter disconnected");
}
BaseEmitter
The BaseEmitter class provides an optional implementation
of the generic portions of the IDataEmitter class to reduce
repetitive code across emitters.
Usage
Extending to wrap event emitter
import {
BaseEmitter,
ICommand,
IDataEvent,
IExecutionResult,
ISettings,
IStatusEvent,
ITraceableAction
} from "@curium.rocks/data-emitter-base";
class SignalEmitter extends BaseEmitter {
private lastDataEvent?: IDataEvent;
constructor(id: string, name: string, desc: string) {
super(id, name, desc);
process.on("SIGINT", ()=>{
this.lastDataEvent = this.buildDataEvent("SIGINT");
this.notifyDataListeners(this.lastDataEvent)
});
}
applySettings(settings: ISettings & ITraceableAction): Promise<IExecutionResult> {
return Promise.reject(new Error("Not Implemented"));
}
probeCurrentData(): Promise<IDataEvent> {
if(!this.lastDataEvent) return Promise.reject(new Error("data unavailable"));
return Promise.resolve(this.lastDataEvent);
}
probeStatus(): Promise<IStatusEvent> {
return Promise.reject(new Error("Not implemented"));
}
sendCommand(command: ICommand): Promise<IExecutionResult> {
return Promise.reject(new Error("Not Implemented"));
}
getMetaData(): unknown {
throw new Error("Not Implemented");
}
}
PollingEmitter
The PollingEmitter provides a common point for all emitters
that require timed polling to fetch data.
Usage
Extending to watch a file
import {ICommand, IExecutionResult, PollingEmitter} from "@curium.rocks/data-emitter-base"
import fs from 'fs';
class FilePollingEmitter extends PollingEmitter {
poll(): Promise<unknown> {
return new Promise((resolve, reject)=>{
fs.readFile('./test.txt', 'utf8' , (err, data) => {
if (err) {
return reject(err);
}
resolve(data);
})
});
}
sendCommand(command: ICommand): Promise<IExecutionResult> {
return Promise.resolve({
success: true,
actionId: command.actionId
})
}
getMetaData(): unknown {
return {
example: "example-val"
}
}
}
Integrations
Check here for more integrations that implement these interfaces.